package org.adam2.sariel.job;

import org.adam2.sariel.Application;
import org.adam2.sariel.factory.ServiceFactory;
import org.adam2.sariel.model.Stock;
import org.adam2.sariel.service.BoardService;
import org.adam2.sariel.service.StockService;
import org.adam2.sariel.service.impl.BoardServiceImpl;
import org.adam2.sariel.service.impl.StockServiceImpl;
import org.adam2.sariel.util.DateUtil;
import org.adam2.sariel.util.IdUtil;
import org.adam2.sariel.util.MapReduceUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;

import java.io.IOException;
import java.math.BigInteger;
import java.util.*;

public class LoadStockJob extends Configured implements Tool {

    private static final String HDFS_URL = "hdfs://centos:9000";

    /**
     * map:处理输入文件，输出（单词 1）
     */
    public static class LoadStockMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

        private final static IntWritable ints = new IntWritable(1);
        private Text keyword = new Text();

        private List<Stock> stockList = new ArrayList<Stock>();

        /**
         * BoardService服务类
         */
        private BoardService boardService = (BoardService) ServiceFactory.getInstance(BoardServiceImpl.class);

        /**
         * StockService服务类
         */
        private StockService stockService = (StockService) ServiceFactory.getInstance(StockServiceImpl.class);

        /**
         * 存储board表中的所有数据
         */
        private List<Map<String, Object>> boardList = boardService.find(null, null);

        /**
         * @param key
         * @param value
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            List<String> columnList = MapReduceUtil.getColumnList(key, value);
            if (null == columnList) {
                return;
            }
            Stock stock = new Stock();
            stock.setId(columnList.get(0).substring(2, columnList.get(0).length()));
            stock.setCode(columnList.get(0).substring(2, columnList.get(0).length()));
            stock.setName(columnList.get(1));
            stock.setTtmPeRatio((Double) MapReduceUtil.filterEmptyCell(columnList.get(2), Double.class));
            stock.setNetProfits((BigInteger) MapReduceUtil.filterEmptyCell(columnList.get(3), BigInteger.class));
            stock.setPriceBookRatio((Double) MapReduceUtil.filterEmptyCell(columnList.get(4), Double.class));
            stock.setTotalMarketCapitalization((BigInteger) MapReduceUtil.filterEmptyCell(columnList.get(5), BigInteger.class));
            stock.setCirculationMarketValue((BigInteger) MapReduceUtil.filterEmptyCell(columnList.get(6), BigInteger.class));
            // 此处可以改进：先将所有的board对象从hbase中取出来，再递归查找如name相同的boardId。
            if (null != boardList && boardList.size() > 0) {
                for (int i = 0; i < boardList.size(); i++) {
                    Map<String, Object> boardMap = boardList.get(i);
                    Set s = boardMap.entrySet();
                    Iterator it = s.iterator();
                    String infoAmount = it.next().toString().split("=")[1];
                    String infoName = it.next().toString().split("=")[1];
                    String rowName = it.next().toString().split("=")[1];
                    if (infoName.equals(columnList.get(7))) {
                        stock.setBoardId(rowName);
                    }
                }
            }
            stockList.add(stock);
        }

        /**
         * 每次初始化LoadStockReducer时，先清空stockList列表
         *
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            if (null != stockList && stockList.size() != 0) {
                stockList.clear();
            }
        }

        /**
         * 在reduce结束时，批量添加Stock对象
         *
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void cleanup(Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            if (null != stockList && stockList.size() > 0) {
                List<Mutation> mutationList = new ArrayList<Mutation>();
                for (Stock stock : stockList) {

                    Put p = new Put(Bytes.toBytes(stock.getId().toString()));
                    // 注意：Bytes.toBytes的参数必须是String类型，否则在HBase中会以16进制显示
                    p.add(Bytes.toBytes("info"), Bytes.toBytes("code"), Bytes.toBytes(stock.getCode().toString()));
                    p.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(stock.getName().toString()));
                    p.add(Bytes.toBytes("info"), Bytes.toBytes("ttm_pe_ratio"), Bytes.toBytes(MapReduceUtil.antiFilterEmptyCell(stock.getTtmPeRatio())));
                    p.add(Bytes.toBytes("info"), Bytes.toBytes("net_profits"), Bytes.toBytes(MapReduceUtil.antiFilterEmptyCell(stock.getNetProfits())));
                    p.add(Bytes.toBytes("info"), Bytes.toBytes("price_book_ratio"), Bytes.toBytes(MapReduceUtil.antiFilterEmptyCell(stock.getPriceBookRatio())));
                    p.add(Bytes.toBytes("info"), Bytes.toBytes("total_market_capitalization"), Bytes.toBytes(MapReduceUtil.antiFilterEmptyCell(stock.getTotalMarketCapitalization())));
                    p.add(Bytes.toBytes("info"), Bytes.toBytes("circulation_market_value"), Bytes.toBytes(MapReduceUtil.antiFilterEmptyCell(stock.getCirculationMarketValue())));
                    p.add(Bytes.toBytes("info"), Bytes.toBytes("board_id"), Bytes.toBytes(MapReduceUtil.antiFilterEmptyCell(stock.getBoardId())));
                    mutationList.add(p);
                }
                stockService.batchPut(mutationList);
            }
        }
    }

    /**
     * reduce：统计map输出，输出（单词，count）
     */
    public static class LoadStockReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        private IntWritable inw = new IntWritable();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : value) {
                sum += val.get();
            }
            inw.set(sum);
            context.write(key, inw);
        }
    }

    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //获取配置文件：
        Configuration conf = super.getConf();

        // 删除hdfs的output路径
        MapReduceUtil.deleteOutput(conf, HDFS_URL, args[1]);

        //创建job：
        Job job = Job.getInstance(conf, this.getClass().getSimpleName());
        job.setJarByClass(Application.class);

        //配置作业：
        // Input --> Map --> Reduce --> Output
        // Input:
        Path inPath = new Path(args[0]);
//        Path inPath = new Path(INPUT_PATH);
        FileInputFormat.addInputPath(job, inPath);
        //FileInputFormat过程会将文件处理（Format）成 <偏移量,每一行内容> 的key value对。

        //Map  设置Mapper类，设置Mapper类输出的Key、Value的类型：
        job.setMapperClass(LoadStockMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //Reduce  设置Reducer类， 设置最终输出的 Key、Value的类型（setOutputKeyClass、setOutputValueClass）：
        job.setReducerClass(LoadStockReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //Output 设置输出路径
        Path outPath = new Path(args[1]);
//        Path outPath = new Path(OUTPUT_PATH);
        FileOutputFormat.setOutputPath(job, outPath);

        //提交任务
        boolean isSucess = job.waitForCompletion(true);
        return isSucess ? 1 : 0;     //成功返回1 ，失败返回0
    }
}
